package day05;

import beans.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 * Flink Table API 与 SQL —— 自定义表函数
 * <p>
 * 阿里云文档：https://help.aliyun.com/document_detail/69559.html
 *
 * @author lvbingbing
 * @date 2022-01-22 11:01
 */
public class FlinkTableApi10 {
    public static void main(String[] args) throws Exception {
        // 1、创建可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int parallelism = 1;
        env.setParallelism(parallelism);
        // 2、从文件中读取数据
        DataStream<SensorReading> dataStreamSource = env.readTextFile("input/sensor.txt")
                .map(e -> {
                    String[] split = e.split(",");
                    return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
                });
        // 3、创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 4、将流数据转换成表
        Table dataTable = tableEnv.fromDataStream(dataStreamSource, "id, timestamp as ts, temperature as temp");
        // 5、自定义表函数
        studyUserDefinedTableFunction(dataTable, tableEnv);
        // 6、触发程序执行
        env.execute();
    }

    /**
     * 自定义表函数，实现将id拆分，并输出（word, length）
     *
     * @param dataTable 数据表
     * @param tableEnv  表执行环境
     */
    private static void studyUserDefinedTableFunction(Table dataTable, StreamTableEnvironment tableEnv) {
        // 1、创建自定义表函数对象
        SplitTableFunction splitTableFunction = new SplitTableFunction("_");
        // 2、注册表函数
        tableEnv.registerFunction("tableFunctionSplit", splitTableFunction);
        // 3、tableApi
        Table resTable = dataTable.joinLateral("tableFunctionSplit(id) as (word, length)")
                .select("id, ts, word, length");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(resTable, Row.class);
        rowDataStream.print("resTable");
        // 4、sql
        tableEnv.createTemporaryView("sensor", dataTable);
        String sql = "select id, ts, word, length from sensor, lateral table(tableFunctionSplit(id)) as split_id(word, length)";
        Table sqlQueryTable = tableEnv.sqlQuery(sql);
        DataStream<Row> rowDataStream1 = tableEnv.toAppendStream(sqlQueryTable, Row.class);
        rowDataStream1.print("sqlQueryRes");
    }

    /**
     * 自定义表函数，实现将id拆分，并输出（word, length）
     */
    public static class SplitTableFunction extends TableFunction<Tuple2<String, Integer>> {

        /**
         * 分隔符，默认以逗号分割
         */
        private String separator = ",";

        public SplitTableFunction(String separator) {
            this.separator = separator;
        }

        public SplitTableFunction() {

        }

        /**
         * 必须要实现 eval 方法，没有返回值
         *
         * @param str <br>
         */
        public void eval(String str) {
            String[] strings = str.split(separator);
            for (String s : strings) {
                collect(new Tuple2<>(s, s.length()));
            }
        }
    }
}